#include "sync.h"
#include "log.h"
#include "util.h"
#include "msg.h"
#include "conf.h"

#include <stdio.h>
#include <string.h>
#include <getopt.h>
#include <stdlib.h>
#include <limits.h>
#include <pthread.h>
#include <unistd.h>  //sleep
#include <errno.h>

#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <signal.h> //pthread_kill,SIGALRM

//msg queue
static msg_queue_item_t *g_msg_queue_item;

//msg queue num
static int g_queue_num = 0;


/**
** 解析出数组
**/ 
UT_array * parse_array(char * value){
    UT_array *temp_array;
    utarray_new(temp_array,&ut_str_icd);
    char delims[] = ",";
    char * result = NULL;
	char * ptrptr = NULL;
	result = strtok_r( value, delims, &ptrptr );
	while ( result != NULL ){
        utarray_push_back(temp_array, &result);
		result = strtok_r( NULL, delims, &ptrptr );
	}
    return temp_array;
}


/**
** 初始化
**/
int init_sync(){
    char sync_queues[1024] = {0};
	get_config_item("sync:sync_queues", sync_queues, 1024, NULL);
    int thread_num = get_config_item_int("pool:thread_num", 1);
    int timeout_secs = get_config_item_int("pool:timeout_secs", 1);
    if(!sync_queues[0]){
		ERROR("No parameter 'sync:sync_queues'");
		return -1;
    }
    UT_array *sync_queues_array = parse_array(sync_queues);
    int len = utarray_len(sync_queues_array);
    if(len<1){
		ERROR("No valid parameter 'sync:sync_queues'");
		return -1;
    }
    g_queue_num = len;
    g_msg_queue_item = (msg_queue_item_t *)calloc(len,sizeof(msg_queue_item_t));
    if(!g_msg_queue_item){
		ERROR("malloc msg_queue_item_t failed");
		return -1;
    }
    char **p;
    p = NULL;
    int n = 0;
    while ( (p=(char**)utarray_next(sync_queues_array,p))) {
        char * queue_name = *p;
        //define config item name
        char queue_path_config_item_name[PATH_MAX] = {0};
        snprintf (queue_path_config_item_name, PATH_MAX, "queue/%s:queue_path", queue_name);
        char sync_nodes_list_config_item_name[PATH_MAX] = {0};
        snprintf (sync_nodes_list_config_item_name, PATH_MAX, "queue/%s:sync_nodes_list", queue_name);
        //require config item value
        get_config_item(queue_path_config_item_name, (g_msg_queue_item+n)->queue_path, PATH_MAX, NULL);
        get_config_item(sync_nodes_list_config_item_name, (g_msg_queue_item+n)->sync_nodes_list, PATH_MAX, NULL);
        (g_msg_queue_item+n)->sync_nodes_array = parse_array( (g_msg_queue_item+n)->sync_nodes_list );
        (g_msg_queue_item+n)->thread_num = thread_num;
        (g_msg_queue_item+n)->timeout_secs = timeout_secs;
        (g_msg_queue_item+n)->timeout_tv.tv_sec = timeout_secs;
        (g_msg_queue_item+n)->timeout_tv.tv_usec = 0;
        (g_msg_queue_item+n)->pool_num = utarray_len((g_msg_queue_item+n)->sync_nodes_array);
        int ret = init_msg_queue_sync(g_msg_queue_item+n);
        if(ret<0){
            ERROR("sync_init failed : %s",(g_msg_queue_item+n)->sync_nodes_list);
            return -1;
        }
        ++n;
    }
    utarray_free(sync_queues_array);
    return 0;
}

/**
** 销毁
**/
int fini_sync(){
    int n = 0;
    for( ; n<g_queue_num; ++n){
        fini_msg_queue_sync(g_msg_queue_item+n);
    }
    return 0;
}

/**
** 读消息主循环
**/
static void * read_msg_main_loop(void * arg) {
    msg_queue_item_t * msg_queue_ic = (msg_queue_item_t *)arg;
    INFO("read_msg_main_loop startup : %s",msg_queue_ic->queue_path);
    //begin to receive msg
    struct  sync_msg_buf msg;
    while(msg_queue_ic->loop_thread_running){
        bzero(&msg,sizeof(msg));
        int ret = msgrcv(msg_queue_ic->msg_system_id, &msg, MSG_MAX_LEN, 0, 0);
        if(ret<0){
            switch(errno){
                case EIDRM:
                    ERROR("msg queue is removed : %s",msg_queue_ic->queue_path);
                    goto loop_end;
                case EINTR:
                    ERROR("msg queue is interepted : %s",msg_queue_ic->queue_path);
                    continue;
                default:
                    ERROR("msgrcv error : %s", strerror(errno));
            }
        }else{
            sync_redis_command(msg_queue_ic, msg.mtext);
        }
    }
loop_end:
    INFO("read_msg_main_loop exit : %s",msg_queue_ic->queue_path);
    return NULL;
}    

/**
** 初始化线程上下文数据
**/
static void* sync_thd_ctx_init_func(void * pool_ctx_p, void ** thd_ctx_pp){
    pool_context_t * pool_ctx = (pool_context_t *)pool_ctx_p;
    thread_context_t * thd_ctx = (thread_context_t *)(*thd_ctx_pp);
    if(thd_ctx==NULL){
        thd_ctx = (thread_context_t *)calloc(1,sizeof(thread_context_t));
        if(!thd_ctx){
            ERROR("sync_thd_ctx_init_func failed");
            return NULL;
        }
        thd_ctx->conn = redisConnectWithTimeout(pool_ctx->ip_buf, pool_ctx->port, pool_ctx->timeout_tv); 
        if ( thd_ctx->conn->err) {  
            redisFree(thd_ctx->conn);  
            free(thd_ctx);
            (*thd_ctx_pp) = NULL;
            ERROR("Connect to redisServer failed : %s:%d",pool_ctx->ip_buf, pool_ctx->port);
        }
        redisEnableKeepAlive(thd_ctx->conn);  //keepalive
        (*thd_ctx_pp) = thd_ctx;
        INFO("Connect to redisServer success : %s:%d",pool_ctx->ip_buf, pool_ctx->port);
    }
    return NULL;
}

/**
** 销毁线程上下文数据
**/
static void* sync_thd_ctx_fini_func(void * pool_ctx_p, void* thd_ctx_p){
    //pool_context_t * pool_ctx = (pool_context_t *)pool_ctx_p;
    thread_context_t * thd_ctx = (thread_context_t *)thd_ctx_p;
    redisFree(thd_ctx->conn);
    free(thd_ctx);
    return NULL;
}

/**
** 同步任务函数
**/
static void* sync_routine(void * pool_ctx_p, void ** thd_ctx_pp, void * arg){
    pool_context_t * pool_ctx = (pool_context_t *)pool_ctx_p;
    thread_context_t * thd_ctx = (thread_context_t *)(*thd_ctx_pp);
    char * command = (char *)arg;
    INFO("execute command to %s:%d @ [%s]", pool_ctx->ip_buf, pool_ctx->port, command);
    redisContext *conn = thd_ctx->conn;
    redisReply * reply = NULL;
    int retry = 2;
    int result = 0;
    while(retry-->0 && !result){
        reply = (redisReply*)redisCommand(conn, command); 
        if(NULL == reply){
            ERROR("execute command failed(hard) to %s:%d @ [%s], error : %s", pool_ctx->ip_buf, pool_ctx->port, command, conn->errstr);
            if(retry>0){
                int ret = redisReconnect(thd_ctx->conn);
                if(!(ret == REDIS_OK)){
                    ERROR("ReConnect to redisServer failed : %s:%d",pool_ctx->ip_buf, pool_ctx->port);
                }
                INFO("ReConnect to redisServer success : %s:%d",pool_ctx->ip_buf, pool_ctx->port);
                continue;
            }else{
                break;
            }
        }else{
            switch(reply->type){
                case REDIS_REPLY_STRING:{
                    result = 1;
                    break;
                }
                case REDIS_REPLY_ARRAY:{
                    result = 1;
                    break;
                }
                case REDIS_REPLY_INTEGER:{
                    result = 1;
                    break;
                }
                case REDIS_REPLY_NIL:{
                    //DO NOTHING
                    ERROR("execute command failed(soft) to %s:%d @ [%s], error : %s", pool_ctx->ip_buf, pool_ctx->port, command, conn->errstr);
                    break;
                }
                case REDIS_REPLY_STATUS:{
                    result = (strcasecmp(reply->str,"OK") == 0)?1:0;
                    break;
                }
                case REDIS_REPLY_ERROR:{
                    //DO NOTHING
                    ERROR("execute command failed(soft) to %s:%d @ [%s], error : %s", pool_ctx->ip_buf, pool_ctx->port, command, conn->errstr);
                    break;
                }
                default:{
                    //DO NOTHING
                    break;
                }
            }
            freeReplyObject(reply);
        }
    }
    if(result){
        INFO("execute command success to %s:%d @ [%s]", pool_ctx->ip_buf, pool_ctx->port, command);
    }else{
        ERROR("execute command failed to %s:%d @ [%s]", pool_ctx->ip_buf, pool_ctx->port, command);
    }
    free(command);  //free memory
    return NULL;
}


/**
** 初始化
**/
int init_msg_queue_sync(msg_queue_item_t * msg_queue_ic){
    int len = utarray_len(msg_queue_ic->sync_nodes_array);
    if(!len){
        return -1;
    }
    msg_queue_ic->pool_cxt = (pool_context_t *)calloc(len,sizeof(pool_context_t));
    if(!msg_queue_ic->pool_cxt){
        return -1;
    }
    msg_queue_ic->pools = (tpool_t **)calloc(len,sizeof(tpool_t*));
    if(!msg_queue_ic->pools){
        free(msg_queue_ic->pool_cxt);
        return -1;
    }
    char **p;
    p = NULL;
    int n = 0;
    while ( (p=(char**)utarray_next(msg_queue_ic->sync_nodes_array,p))) {
        if (parse_addr(*p, (msg_queue_ic->pool_cxt+n)->ip_buf, 20, &(msg_queue_ic->pool_cxt+n)->port) == 0) {
            (msg_queue_ic->pool_cxt+n)->timeout_tv.tv_sec = msg_queue_ic->timeout_tv.tv_sec;
            (msg_queue_ic->pool_cxt+n)->timeout_tv.tv_usec = msg_queue_ic->timeout_tv.tv_usec;
            *(msg_queue_ic->pools+n) = tpool_create(msg_queue_ic->thread_num,(msg_queue_ic->pool_cxt+n),sync_thd_ctx_init_func,sync_thd_ctx_fini_func);
            if( ! *(msg_queue_ic->pools+n) ){
                ERROR("tpool_create failed : %s:%d", (msg_queue_ic->pool_cxt+n)->ip_buf, (msg_queue_ic->pool_cxt+n)->port);
                //TODO: free memory
                return -1;
            }
            ++n;
        }
    }
    key_t  key;
    //Construct system msg queue
    touch(msg_queue_ic->queue_path, MSG_QUEUE_PERMS);
    key = ftok(msg_queue_ic->queue_path, MSG_PROJECT_ID);
    if(key == -1){
        ERROR("ftok error : %s", strerror(errno)); 
        return -1;
    }
    msg_queue_ic->msg_system_id = msgget(key, IPC_CREAT | MSG_QUEUE_PERMS);
    if(msg_queue_ic->msg_system_id == -1){
        ERROR("msgget error : %s", strerror(errno));
        return -1;
    }
    msg_queue_ic->loop_thread_running = 1;
    pthread_attr_t attr;
	pthread_attr_init (&attr);
	pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); 
	if (pthread_create(&msg_queue_ic->loop_thread_id, &attr, ( void* (*)(void*) )read_msg_main_loop, (void *)msg_queue_ic)) {
		ERROR("loop_thread create error : %s", strerror(errno) );
		return -1;
	}
    INFO("init_msg_queue_sync success : %s",msg_queue_ic->queue_path);
    return 0;
}


/**
** 销毁
**/
int fini_msg_queue_sync(msg_queue_item_t * msg_queue_ic){
    msg_queue_ic->loop_thread_running = 0;
    pthread_kill(msg_queue_ic->loop_thread_id, SIGALRM); //cause loop thread receive EINTER
    int n;
    if(msg_queue_ic->pools){
        for(n=0; n<msg_queue_ic->pool_num; ++n){
            if( *(msg_queue_ic->pools+n) != NULL){
                INFO("destroy pool %s:%d",(msg_queue_ic->pool_cxt+n)->ip_buf, (msg_queue_ic->pool_cxt+n)->port);
                tpool_destroy(*(msg_queue_ic->pools+n));
            }
        }
    }
    if(msg_queue_ic->pools){
        free(msg_queue_ic->pools);
    }
    if(msg_queue_ic->pool_cxt){
        free(msg_queue_ic->pool_cxt);
    }
    if(msg_queue_ic->sync_nodes_array){
        utarray_free(msg_queue_ic->sync_nodes_array);
    }
    INFO("fini_msg_queue_sync success : %s",msg_queue_ic->queue_path);
    return 0;
}


/**
** 处理redis命令
** 需要对所有的配置节点都执行这个命令
**/
int sync_redis_command(msg_queue_item_t * msg_queue_ic, const char * param_command){
    INFO("handle command : %s", param_command);
    if(!msg_queue_ic->pool_num){
        ERROR("No avaliable pools : %s",msg_queue_ic->queue_path);
        return -1;
    }
    int n;
    if(msg_queue_ic->pools){
        for(n=0; n<msg_queue_ic->pool_num; ++n){
            if( *(msg_queue_ic->pools+n) != NULL){
                char * command = (char *)calloc(MSG_MAX_LEN,sizeof(char));
                if(!command){
                    ERROR("No avaliable memory!");
                    return -1;
                }
                snprintf (command, MSG_MAX_LEN, "%s", param_command);
                tpool_add_work(*(msg_queue_ic->pools+n), sync_routine, (void *)command);
            }
        }
    }
    return 0;
}
